#!/usr/bin/env python # encoding: utf-8 """tracker.py -- modified from webgps.py It tracks the currently visible GPS satellites over a time period. Usage: ./tracker.py the script never exits and continuously updates the skyview. tracker.py generates a file "gpsd-c.json" with the satellite tracks. If tracker.py is interrupted with Ctrl-C before the duration is over, it saves the current tracks into the file "tracks.j". This is a JSON file. If this file is present on start of tracker.py, it is loaded. This allows to restart tracker.py without losing accumulated satellite tracks. """ # This file is Copyright by the GPSD project # SPDX-License-Identifier: BSD-2-clause from __future__ import absolute_import, print_function, division import ctypes from ctypes.util import find_library import json import math import os # import sys import gps # Untested: any recent gpsd should do C = ctypes.CDLL(find_library("c")) C.shm_open.res_type = ctypes.c_int C.shm_open.arg_types = [ctypes.c_char_p, ctypes.c_int, ctypes.c_int] C.shm_unlink.res_type = ctypes.c_int C.shm_unlink.arg_types = [ctypes.c_char_p] # os.close() # os.ftruncate # os.lseek # os.fstat TRACKMAX = 540 STALECOUNT = 10 # JSONFILE = "/tmp/gpsd-c.json" JFILE = "tracks.j" MASTER_ENCODING = "latin-1" def hypodis(polar_a, polar_b): """Calculate the wrong? distance between two polar 2D points.""" return math.hypot( (polar_a[0] - polar_b[0]), (polar_a[1] - polar_b[1]) ) class SatTracks(gps.gps): """gpsd client writing JSON output.""" def __init__(self): super(SatTracks, self).__init__(device="/dev/ttyUSB0") self.sattrack = {} # maps PRNs to Tracks self.state = None self.needsupdate = 0 self.heads = [b"", b"", b""] self.fd1 = C.shm_open( "gpsdc-tracks", os.O_CREAT | os.O_RDWR, 0o644 ) os.ftruncate(self.fd1, 1 << 16) def json(self, fd=None): # write the tracks if fd is None: return dictionary = { str(prn): self.sattrack[prn]['track'] for prn in self.sattrack.keys() } for t in self.sattrack.values(): if t['track']: dictionary[t['prn']] = t['track'] jsot = json.dumps(dictionary) os.write(fd, bytes(jsot + "\0", encoding=MASTER_ENCODING)) def make_stale(self): for t in self.sattrack.values(): if t['stale']: t['stale'] -= 1 def delete_stale(self): stales = [] for prn in self.sattrack.keys(): if self.sattrack[prn]['stale'] == 0: stales.append(prn) self.needsupdate = 1 for prn in stales: print("OLD: Deleting staled track(s) %s" % repr(prn)) del self.sattrack[prn] def insert_sat(self, prn, x, y): try: t = self.sattrack[prn] except KeyError: t = self.sattrack[prn] = { "prn": prn, "stale": STALECOUNT, "track": [], } pos = (x, y) lenny = len(t['track']) t["stale"] = STALECOUNT if 0 == lenny or 0.8 < hypodis(t["track"][0], pos) < 1.4: t["track"].insert(0, pos) lenny += 1 if lenny > TRACKMAX: print( "Dropping record: Track %d too long %d." % (prn, lenny) ) t['track'] = t['track'][:TRACKMAX] self.needsupdate = True def update_tracks(self): self.make_stale() for s in self.satellites: self.insert_sat(s.PRN, s.elevation, s.azimuth) self.delete_stale() def generate_json(self, fd): os.lseek(fd, 0, os.SEEK_SET) self.json(fd) def run(self): self.needsupdate = 1 self.stream(gps.WATCH_ENABLE | gps.WATCH_NEWSTYLE) for report in self: if report["class"] not in ("SKY"): continue if "/dev/ttyUSB0" != self.device: continue self.update_tracks() if self.needsupdate: self.generate_json(self.fd1) self.needsupdate = 0 def dump(self): with open(JFILE, "w", encoding=MASTER_ENCODING) as j: j.write(json.dumps(self.sattrack)) def restore(self): if os.path.isfile(JFILE): print("INIT: Found tracks file.") with open(JFILE, "r", encoding=MASTER_ENCODING) as j: buf = j.read() if buf: self.sattrack = json.loads(buf) def main(): sat = SatTracks() sat.restore() # restore the tracks try: sat.run() except KeyboardInterrupt: pass finally: sat.dump() # save the tracks if __name__ == "__main__": main()